NiFi에서 Timestamp Formatting은 pattern을 지원하지 않는 문제로 인하여 밀리초 등이 잘리는 이슈가 있다. 따라서 이 문제를 해결하기 위해서 몇가지 고민들이 있었는데 가장 먼저 Avro에 대한 기술적인 이슈이다.

우선 Avro에서 모든 logical type은 다음과 같이 LogicalTypes에 static map으로 전역에 정의한다. 따라서 ClassLoader가 같다면 항상 같은 Class를 가지고 있게 되므로 Logical Type에는 이유야 timestamp pattern을 넣으면 안된다. 이것이 근본적인 이슈가 된다. Avro Schema에서 Logical Type을 확장해봐야 아무 의미가 없게 된다. 하지만 새로운 logical type 자체는 정의할 수 있다.

LogicalTypes.register(TimestampLogicalType.LOGICAL_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
    @Override
    public LogicalType fromSchema(Schema schema) {
        TimestampLogicalType logicalType = (TimestampLogicalType) schema.getLogicalType();
        return new TimestampLogicalType(logicalType.getTimestampPattern());
    }
});

새로운 Logical Type음 다음과 같이 정의할 수 있다.

import org.apache.avro.LogicalType;
import org.apache.avro.Schema;

public class TimestampLogicalType extends LogicalType {

    public static final String LOGICAL_TYPE_NAME = "timestamp";

    public TimestampLogicalType(String timestampPattern) {
        super(LOGICAL_TYPE_NAME);
    }

    @Override
    public void validate(Schema schema) {
        super.validate(schema);

        if (schema.getType() != Schema.Type.LONG) {
            throw new IllegalArgumentException("Logical type 'timestamp' must be backed by long");
        }
    }
}


Avro에서 다음과 같이 type, name, namespace, fields를 제외한 나머지 속성은 property로 정의할 수 있다. 따라서 Avro Schema에서 timestamp pattern을 넣을 가장 최적의 위치는 property로 정의하는 것이다. 이 값은 NiFi에서 파라미터값으로 key값을 지정하면 된다.

{
  "type": "record",
  "name": "hellowlrd",
  "namespace": "hello",
  "fields": [
    {
      "name": "type",
      "type": {
        "type": "long",
        "logicalType": "timestamp",
        "pattern": "yyyy-MM-dd HH:mm:ss.SSS"
      }
    }
  ],
  "properties": {
    "column.all.pattern": "yyyy-MM-dd HH:mm:ss.SSS",
    "column.type.pattern": "yyyy-MM-dd HH:mm:ss"
  }
}

이러한 스키마는 다음과 문자열로 생성할 수 있다.

HashMap<Object, Object> props = new HashMap<>();
props.put("column.all.pattern", "yyyy-MM-dd HH:mm:ss.SSS");
props.put("column.type.pattern", "yyyy-MM-dd HH:mm:ss");
Schema timestampMilliType = new TimestampLogicalType("yyyy-MM-dd HH:mm:ss.SSS").addToSchema(Schema.create(Schema.Type.LONG));
Schema s = SchemaBuilder.record("hellowlrd")
        .namespace("hello")
        .prop("properties", props)
        .fields().name("type").type(timestampMilliType).noDefault()
        .endRecord();

System.out.println(s);

반대로 다음과 같이 로딩하여 확인할 수 있다.

Schema schema = new Schema.Parser().parse(new File("src/test/resources/test.avro.json"));
Map<String, Object> objectProps = schema.getObjectProps();
System.out.println(objectProps.get("properties"));
System.out.println(schema);

따라서 NiFi에서 Timestamp의 밀리초, 마이크로초가 잘리는 문제, 그리고 포맷을 자유롭게 지정할 수 없는 문제를 해결하고자 다음의 방향성이 필요하다.

  • 포맷팅을 지원할 수 있는 logical type을 새로 만들어서 등록한다.
  • avro schema에서 property로 timestamp pattern을 정의한다.
  • nifi processor에서는 이 property의 timestamp pattern을 꺼내올 수 있는 key를 지정하도록 한다.
  • key의 형식은 column.<name>.pattern 형식으로 한다. name이 all이면 모든 timestamp를 동일하게 포맷팅하고, 컬럼명이 명시적으로 있으면 해당 컬럼에만 적용한다.
  • 레이블 없음

2 댓글

  1. Edward

    Tomcat에서는 FastDateFormat이라는 것을 만들어서 Caching 기능을 추가하여 별도로 변환하지 않고 이미 있는 것들을 그대로 리턴한다. → millis micro는 효과가 없을 것같고 yyyy-MM-dd yyyy-MM-dd HH:mm:ss 까지는 괜찮을것같다.

    https://github.com/apache/tomcat/blob/main/java/org/apache/tomcat/util/http/FastHttpDateFormat.java

    Commons Lang에 FastDateFormat은 Thread Safe하며 Fast하다.

    Caching 적용시 요즘 Caffeine을 쓰는 경우도 있다.

    GitHub - ben-manes/caffeine: A high performance caching library for Java

  2. Edward

    결론적으로 다음과 같이 적용함. 날짜 전체를 적용하거나, 특정 컬럼만 적용할수 있도록 properties 속성으로 정의하고 AvroTypeUtil, DataTypeUtil을 수정하는 것으로..

    {
      "type": "record",
      "name": "hellowlrd",
      "namespace": "hello",
      "fields": [
        {
          "name": "type",
          "type": {
            "type": "long",
            "logicalType": "timestamp"
          }
        }
      ],
      "properties": {
        "column.all.pattern": "yyyy-MM-dd HH:mm:ss.SSS",
        "column.type.pattern": "yyyy-MM-dd HH:mm:ss"
      }
    }